package com.superbit.service.mq.define;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Repository;

import com.superbit.core.exception.ServerErrorException;
import com.superbit.service.mq.dao.ServiceStatusListener;



@Repository(value="MQSessionKeeper")
public class ActiveMQSessionKeeper implements TransportListener,ApplicationListener<ContextRefreshedEvent>{
	private Log logger = LogFactory.getLog(this.getClass());
	
	@Resource(name="amqConnectionFactory")
	private ActiveMQConnectionFactory connectionFactory;
	
	@Autowired
	private ServiceStatusListener serviceStatusListener;
	
	//Session
	private Session activeMqSession;
	//Connection
	private ActiveMQConnection connection; 
	//Producers
	private Map<String,MessageProducer> producers = new HashMap<String,MessageProducer>();
	//是否连接中断
	private boolean isTransportInterupt;
	
	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		initSession();
	}
	
	//放到Spring加载完成后执行
	private void initSession(){
		try{
			connection = (ActiveMQConnection)connectionFactory.createConnection();
			connection.addTransportListener(this);
			connection.start();
			activeMqSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		}catch(Exception e){
			isTransportInterupt = true;
			serviceStatusListener.setMQWorkingStates(false);
			logger.error("获取Session失败");
		}
	}
	
	@PreDestroy
	public void destorySession(){
		if(activeMqSession!=null){
			try {
				activeMqSession.close();
			} catch (JMSException e) {
				logger.error("关闭Session失败");
			}
		}
		if(connection!=null){
			try {
				connection.close();
			} catch (JMSException e) {
				logger.error("Connection失败");
			}
		}
	}
	
	public Session getActiveSession() {
		if(isTransportInterupt){
			return null;
		}
		if(activeMqSession==null){
			System.err.println("生成session");
			try {
				activeMqSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			} catch (JMSException e) {
				isTransportInterupt = true;
				serviceStatusListener.setMQWorkingStates(false);
				return null;
			}
		}
		return activeMqSession;
	}

	public void sendOrderMessage(String destinationName, Message message) {
		try{
			MessageProducer messageProducer = this.producers.get(destinationName);
			if(messageProducer==null){
				Destination destination = activeMqSession.createQueue(destinationName);
				messageProducer = activeMqSession.createProducer(destination);
				messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
				producers.put(destinationName, messageProducer);
			}
			messageProducer.send(message);
		}catch(JMSException e){
			throw new ServerErrorException("MessageBuild Error", null);
		}
		
	}
	
	@Override
	public void transportInterupted() {
		System.out.println("链接中断");
		isTransportInterupt = true;
		serviceStatusListener.setMQWorkingStates(false);
	}
	@Override
	public void transportResumed() {
		System.out.println("连接成功");
		isTransportInterupt = false;
		serviceStatusListener.setMQWorkingStates(true);
	}
	@Override
	public void onCommand(Object command) {
		System.out.println("command:"+command);
	}
	@Override
	public void onException(IOException error) {
		System.out.println("error:"+error.getMessage());
	}
}
